/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/***********************************************************************
 *   $Id: tdriver-bru.cpp 9210 2013-01-21 14:10:42Z rdempsey $
 *
 *
 ***********************************************************************/
#include <iostream>
#include <sstream>
#include <fstream>
#include <iomanip>
#include <vector>
#include <set>

#include <pthread.h>
#include <time.h>
#include <sys/time.h>

#include <boost/filesystem/path.hpp>
#include <boost/filesystem/operations.hpp>
#include <boost/thread/mutex.hpp>

#include <cppunit/extensions/HelperMacros.h>
#include <cppunit/extensions/TestFactoryRegistry.h>
#include <cppunit/ui/text/TestRunner.h>

#include "bucketdl.h"
#include "elementtype.h"
#include "stopwatch.cpp"

#include "bucketreuse.h"

// #undef CPPUNIT_ASSERT
// #define CPPUNIT_ASSERT(x)

using namespace std;
using namespace boost;
using namespace joblist;

// Stopwatch timer;

//------------------------------------------------------------------------------
// TestDriver class derived from CppUnit
//------------------------------------------------------------------------------

class BucketReUseDriver : public CppUnit::TestFixture
{
  CPPUNIT_TEST_SUITE(BucketReUseDriver);
  CPPUNIT_TEST(parseConfig);
  CPPUNIT_TEST(createFiles);
  CPPUNIT_TEST(reuseFiles);
  CPPUNIT_TEST(newversion);
  CPPUNIT_TEST(concurrent);
  CPPUNIT_TEST(concurrent_newversion);
  CPPUNIT_TEST(concurrent_race);
  CPPUNIT_TEST_SUITE_END();

 private:
 public:
  //--------------------------------------------------------------------------
  // setup method run prior to each unit test, inherited from base
  //--------------------------------------------------------------------------
  void setUp()
  {
    clock_gettime(CLOCK_REALTIME, &ts);
  }

  //--------------------------------------------------------------------------
  // validate results from a unit test, inherited from base
  //--------------------------------------------------------------------------
  void validateResults()
  {
  }

  //--------------------------------------------------------------------------
  // test functions
  //--------------------------------------------------------------------------
  void parseConfig();
  void createFiles();
  void reuseFiles();
  void newversion();
  void concurrent();
  void concurrent_newversion();
  void concurrent_race();

 private:
  //--------------------------------------------------------------------------
  // initialize method
  // oid : reference to the column OID used for test
  //--------------------------------------------------------------------------
  void initControl(execplan::CalpontSystemCatalog::OID& oid);

  //--------------------------------------------------------------------------
  // validate results
  // files: the filenames to be verified if exist
  // exist: expect if the files are created or deleted
  //--------------------------------------------------------------------------
  void validateFileExist(set<string>& files, bool exist);

  static void* insertThread(void*);
  static void* readThread(void*);

  static void* scanThread(void*);
  static void* reuseThread(void*);
  static void* raceThread(void*);

  struct ThreadArg
  {
    uint64_t id;                              // thread id
    uint64_t version;                         // db version
    uint64_t buckets;                         // max bucket numbers
    uint64_t elements;                        // max number of elem per bucket
    uint64_t total;                           // total number of elements
    execplan::CalpontSystemCatalog::OID oid;  // column OID
    BucketDataList* dl;                       // datalist

    // for sync threads
    bool* flag;
    pthread_mutex_t* mutex;
    pthread_cond_t* cond;

    // for file status check
    set<string>* files;

    ThreadArg()
     : id(0)
     , version(0)
     , buckets(0)
     , elements(0)
     , total(0)
     , dl(NULL)
     , flag(NULL)
     , mutex(NULL)
     , cond(NULL)
     , files(NULL)
    {
    }
  };

  static const string column;

 public:
  static struct timespec ts;
};

CPPUNIT_TEST_SUITE_REGISTRATION(BucketReUseDriver);

const string BucketReUseDriver::column = "tpch.lineitem.l_orderkey";
struct timespec BucketReUseDriver::ts;

//------------------------------------------------------------------------------
// main entry point
//------------------------------------------------------------------------------
int main(int argc, char** argv)
{
  CppUnit::TextUi::TestRunner runner;
  CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
  runner.addTest(registry.makeTest());
  bool wasSuccessful = runner.run("", false);
  return (wasSuccessful ? 0 : 1);
}

struct timespec atTime();
ostream& operator<<(ostream& os, const struct timespec& t);

void BucketReUseDriver::parseConfig()
{
  cout << "ut: parseConfig start...\n" << endl;

  // before run this test, make sure "tpch.lineitem.l_orderkey" is in the Columnstore.xml
  BucketReuseManager* control = BucketReuseManager::instance();
  ResourceManager rm;
  BucketReuseManager::instance()->startup(rm);

  // list all the predicates if any listed in the Columnstore.xml file
  for (BucketReuseMap::iterator it = control->fControlMap.begin(); it != control->fControlMap.end(); it++)
    cout << *(it->second);

  cout << endl;

  size_t schemap = column.find_first_of(".");
  size_t columnp = column.find_last_of(".");
  CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);

  execplan::CalpontSystemCatalog::TableColName columnName;
  columnName.schema = column.substr(0, schemap);
  columnName.table = column.substr(schemap + 1, columnp - schemap - 1);
  columnName.column = column.substr(columnp + 1);

  string filter = "allrows";

  CPPUNIT_ASSERT(control->fControlMap.find(BucketFileKey(columnName, filter)) != control->fControlMap.end());

  cout << "ut: parseConfig done!\n" << endl;
}

void BucketReUseDriver::createFiles()
{
  cout << "ut: createFiles start...\n" << endl;

  ThreadArg arg;
  arg.id = 1;
  arg.version = 1;
  arg.buckets = 2;
  arg.elements = 2;
  arg.total = 1000000;

  execplan::CalpontSystemCatalog::OID oid;
  initControl(oid);
  arg.oid = oid;

  set<string> files;
  arg.files = &files;

  pthread_t t;
  pthread_create(&t, NULL, scanThread, &arg);
  pthread_join(t, NULL);

  validateFileExist(files, true);

  cout << "ut: createFiles done!\n" << endl;
}

void BucketReUseDriver::reuseFiles()
{
  cout << "ut: reuseFiles start...\n" << endl;

  ThreadArg arg1;
  arg1.id = 1;
  arg1.version = 1;
  arg1.buckets = 2;
  arg1.elements = 2;
  arg1.total = 1000000;

  execplan::CalpontSystemCatalog::OID oid;
  initControl(oid);
  arg1.oid = oid;

  set<string> files1;
  arg1.files = &files1;

  // create the files
  pthread_t t1;
  pthread_create(&t1, NULL, scanThread, &arg1);
  pthread_join(t1, NULL);

  validateFileExist(files1, true);

  // use new datalist, reuse case
  ThreadArg arg2 = arg1;
  arg2.id = 2;

  set<string> files2;
  arg2.files = &files2;

  pthread_t t2;
  pthread_create(&t2, NULL, reuseThread, &arg2);
  pthread_join(t2, NULL);

  validateFileExist(files2, true);

  cout << "ut: reuseFiles done!\n" << endl;
}

void BucketReUseDriver::newversion()
{
  cout << "ut: newversion start...\n" << endl;

  ThreadArg arg1;
  arg1.id = 1;
  arg1.version = 1;
  arg1.buckets = 2;
  arg1.elements = 2;
  arg1.total = 1000000;

  execplan::CalpontSystemCatalog::OID oid;
  initControl(oid);
  arg1.oid = oid;

  set<string> files1;
  arg1.files = &files1;

  // create the files
  pthread_t t1;
  pthread_create(&t1, NULL, scanThread, &arg1);
  pthread_join(t1, NULL);

  validateFileExist(files1, true);

  // new version
  ThreadArg arg2 = arg1;
  arg2.id = 2;
  arg2.version = 2;

  set<string> files2;
  arg2.files = &files2;

  pthread_t t2;
  pthread_create(&t2, NULL, scanThread, &arg2);
  pthread_join(t2, NULL);

  pthread_yield();

  validateFileExist(files1, false);
  validateFileExist(files2, true);

  // read from the new files with new datalist
  ThreadArg arg3 = arg2;
  arg3.id = 3;
  arg3.dl = NULL;

  set<string> files3;
  arg3.files = &files3;

  pthread_t t3;
  pthread_create(&t3, NULL, reuseThread, &arg3);
  pthread_join(t3, NULL);

  validateFileExist(files3, true);

  cout << "ut: newversion done!\n" << endl;
}

void BucketReUseDriver::concurrent()
{
  cout << "ut: concurrent start...\n" << endl;

  ThreadArg arg1;
  arg1.id = 1;
  arg1.version = 1;
  arg1.buckets = 2;
  arg1.elements = 2;
  arg1.total = 1000000;

  execplan::CalpontSystemCatalog::OID oid;
  initControl(oid);
  arg1.oid = oid;

  set<string> files1;
  arg1.files = &files1;

  // create the files
  pthread_t t1;
  pthread_create(&t1, NULL, scanThread, &arg1);

  sleep(1);

  // reuse case, current thread
  ThreadArg arg2 = arg1;
  arg2.id = 2;
  set<string> files2;
  arg2.files = &files2;

  pthread_t t2;
  pthread_create(&t2, NULL, reuseThread, &arg2);

  pthread_join(t1, NULL);
  validateFileExist(files1, true);

  pthread_join(t2, NULL);
  validateFileExist(files2, true);

  cout << "ut: concurrent done!\n" << endl;
}

void BucketReUseDriver::concurrent_newversion()
{
  cout << "ut: concurrent_newversion start...\n" << endl;

  bool flag = false;
  pthread_mutex_t mutex;
  pthread_mutex_init(&mutex, 0);
  pthread_cond_t cond;
  pthread_cond_init(&cond, 0);

  ThreadArg arg1;
  arg1.id = 1;
  arg1.version = 1;
  arg1.buckets = 2;
  arg1.elements = 2;
  arg1.total = 1000000;

  execplan::CalpontSystemCatalog::OID oid;
  initControl(oid);
  arg1.oid = oid;

  set<string> files1;
  arg1.files = &files1;

  // create the files
  pthread_t t1;
  pthread_create(&t1, NULL, scanThread, &arg1);

  // reuse case, current thread
  ThreadArg arg2 = arg1;
  arg2.id = 2;
  set<string> files2;
  arg2.files = &files2;

  sleep(1);

  pthread_t t2;
  pthread_create(&t2, NULL, reuseThread, &arg2);

  // new version
  ThreadArg arg3 = arg1;
  arg3.id = 3;
  arg3.version = 3;
  arg3.flag = &flag;
  arg3.mutex = &mutex;
  arg3.cond = &cond;
  set<string> files3;
  arg3.files = &files3;

  pthread_t t3;
  pthread_create(&t3, NULL, scanThread, &arg3);

  sleep(3);

  pthread_mutex_lock(&mutex);
  flag = true;
  pthread_cond_broadcast(&cond);
  pthread_mutex_unlock(&mutex);

  pthread_join(t1, NULL);
  pthread_join(t2, NULL);
  pthread_join(t3, NULL);

  // let cleanup thread do its job
  pthread_yield();

  validateFileExist(files1, false);
  validateFileExist(files2, false);
  validateFileExist(files3, true);

  pthread_mutex_destroy(&mutex);
  pthread_cond_destroy(&cond);

  cout << "ut: concurrent_newversion done!\n" << endl;
}

void BucketReUseDriver::concurrent_race()
{
  cout << "ut: concurrent_race start...\n" << endl;

  ThreadArg arg1;
  arg1.id = 1;
  arg1.version = 1;
  arg1.buckets = 2;
  arg1.elements = 2;
  arg1.total = 2000000;

  execplan::CalpontSystemCatalog::OID oid;
  initControl(oid);
  arg1.oid = oid;

  set<string> files1;
  arg1.files = &files1;

  ThreadArg arg2 = arg1;
  arg2.id = 2;
  set<string> files2;
  arg2.files = &files2;

  // start the version 1 threads
  pthread_t t1;
  pthread_t t2;
  pthread_create(&t1, NULL, raceThread, &arg1);
  pthread_create(&t2, NULL, raceThread, &arg2);

  // let the version 1 threads register
  sleep(1);

  ThreadArg arg3 = arg1;
  arg3.id = 3;
  arg3.version = 4;
  arg3.total = 1000000;
  set<string> files3;
  arg3.files = &files3;

  ThreadArg arg4 = arg3;
  arg4.id = 4;
  set<string> files4;
  arg4.files = &files4;

  // start the version 4 threads
  pthread_t t3;
  pthread_t t4;
  pthread_create(&t3, NULL, raceThread, &arg3);
  pthread_create(&t4, NULL, raceThread, &arg4);

  pthread_join(t3, NULL);
  pthread_join(t4, NULL);
  validateFileExist(files1, true);
  validateFileExist(files4, true);

  pthread_join(t1, NULL);
  pthread_join(t2, NULL);
  validateFileExist(files1, false);
  validateFileExist(files4, true);

  cout << "ut: concurrent_race done!\n" << endl;
}

void BucketReUseDriver::initControl(execplan::CalpontSystemCatalog::OID& oid)
{
  // make sure the column for testing is in the map, "column" is a class variable
  // to test with another column, only one place to change
  // -- to validate the config parsing, run parseConfig --
  config::Config* cf = config::Config::makeConfig();
  vector<string> columns;
  cf->getConfig("HashBucketReuse", "Predicate", columns);

  bool found = true;

  for (vector<string>::iterator it = columns.begin(); it != columns.end(); ++it)
  {
    if (it->compare(0, column.size(), column) == 0)
    {
      found = true;
      break;
    }
  }

  string filter = "allrows";
  BucketReuseManager* control = BucketReuseManager::instance();

  if (found == false)
  {
    cout << "tpch.lineitem.l_orderkey is not in the Columnstore.xml!)" << endl;
    cout << "insert it to countinue unit test" << endl;

    size_t schemap = column.find_first_of(".");
    size_t columnp = column.find_last_of(".");
    CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);

    execplan::CalpontSystemCatalog::TableColName tcn;
    tcn.schema = column.substr(0, schemap);
    tcn.table = column.substr(schemap + 1, columnp - schemap - 1);
    tcn.column = column.substr(columnp + 1);

    control->fConfigMap.insert(pair<string, BucketFileKey>(column, BucketFileKey(tcn, filter)));
  }

  ResourceManager rm;
  // now  start the BucketReuseManager
  BucketReuseManager::instance()->startup(rm);

  // get the oid for registration
  size_t schemap = column.find_first_of(".");
  size_t columnp = column.find_last_of(".");
  CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);

  execplan::CalpontSystemCatalog::TableColName columnName;
  columnName.schema = column.substr(0, schemap);
  columnName.table = column.substr(schemap + 1, columnp - schemap - 1);
  columnName.column = column.substr(columnp + 1);

  CPPUNIT_ASSERT(control->fControlMap.find(BucketFileKey(columnName, filter)) != control->fControlMap.end());
}

void BucketReUseDriver::validateFileExist(set<string>& files, bool exist)
{
  cout << "\ncheck if files exist or not:" << endl;

  for (set<string>::iterator i = files.begin(); i != files.end(); i++)
  {
    filesystem::path p(i->c_str());
    cout << (*i) << "-- ";

    if (exist)
    {
      CPPUNIT_ASSERT(filesystem::exists(p));
      cout << "OK" << endl;
    }
    else
    {
      CPPUNIT_ASSERT(!filesystem::exists(p));
      cout << "GONE" << endl;
    }
  }

  cout << endl;
}

void* BucketReUseDriver::insertThread(void* arg)
{
  ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
  BucketDataList* dl = a->dl;
  CPPUNIT_ASSERT(dl != NULL);

  cout << "thread " << a->id << " start at " << atTime() << endl;

  BucketReuseControlEntry* entry = dl->reuseControl();

  for (uint64_t i = 0; i < a->buckets; i++)
  {
    stringstream ss;
    ss << entry->baseName() << "." << i;
    filesystem::path p(ss.str().c_str());
    a->files->insert(ss.str());
  }

  ElementType e;

  for (uint64_t i = 0; i < a->total; i++)
  {
    e.first = i;
    e.second = i * 10 + a->version;  // include the version in values
    dl->insert(e);
  }

  cout << "thread[" << a->id << "] last element inserted at " << atTime() << endl;
  dl->endOfInput();

  cout << "thread " << a->id << " finished at " << atTime() << endl;

  return NULL;
}

void* BucketReUseDriver::readThread(void* arg)
{
  ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
  BucketDataList* dl = a->dl;
  CPPUNIT_ASSERT(dl != NULL);

  cout << "thread " << a->id << " start at " << atTime() << endl;

  BucketReuseControlEntry* entry = dl->reuseControl();

  for (uint64_t i = 0; i < a->buckets; i++)
  {
    stringstream ss;
    ss << entry->baseName() << "." << i;
    filesystem::path p(ss.str().c_str());
    a->files->insert(ss.str());
  }

  ElementType e;
  uint64_t min = 0xffffffff, max = 0, count = 0;
  uint64_t k[a->buckets];  // count of each bucket
  bool firstRead = true;

  for (uint64_t i = 0; i < a->buckets; i++)
  {
    k[i] = 0;
    uint64_t it = dl->getIterator(i);

    while (dl->next(i, it, &e))
    {
      if (firstRead)
      {
        cout << "thread[" << a->id << "] first read at " << atTime() << endl;
        firstRead = false;
      }

      if (e.second < min)
        min = e.second;

      if (e.second > max)
        max = e.second;

      // output the first 10 of each bucket or last 10 of the datalist
      // if (count < 10 || (a->total - count) < 10 || k[i] < 10)
      if (count < 2 || (a->total - count) < 2 || k[i] < 2)
        cout << "thread[" << a->id << "] bucket:" << i << " e(" << e.first << ", " << e.second << ")" << endl;

      count++;
      k[i]++;
    }
  }

  cout << "\nthread[" << a->id << "] element: count = " << count << ", min/max = " << min << "/" << max
       << ", elements in each bucket: ";

  for (uint64_t i = 0; i < a->buckets; i++)
    cout << k[i] << " ";

  cout << endl;

  cout << "thread " << a->id << " finished at " << atTime() << endl;

  return NULL;
}

void* BucketReUseDriver::scanThread(void* arg)
{
  ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
  BucketDataList* dl = a->dl;
  CPPUNIT_ASSERT(dl == NULL);

  if (a->cond != NULL)
  {
    pthread_mutex_lock(a->mutex);

    while (*(a->flag) != true)
      pthread_cond_wait(a->cond, a->mutex);

    pthread_mutex_unlock(a->mutex);
  }

  cout << "thread " << a->id << " start at " << atTime() << endl;

  string dummy;
  bool scan = false;
  boost::shared_ptr<execplan::CalpontSystemCatalog> c =
      execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
  execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
  BucketReuseControlEntry* entry =
      BucketReuseManager::instance()->userRegister(tcn, dummy, a->version, a->buckets, scan);
  CPPUNIT_ASSERT(scan == true);

  ResourceManager rm;
  dl = new BucketDataList(a->buckets, 1, a->elements, rm);
  dl->setElementMode(1);
  dl->reuseControl(entry, !scan);

  ThreadArg arg1 = *a;
  arg1.id *= 10;
  arg1.dl = dl;

  pthread_t t1;
  pthread_create(&t1, NULL, insertThread, &arg1);

  ThreadArg arg2 = arg1;
  arg2.id += 1;
  pthread_t t2;
  pthread_create(&t2, NULL, readThread, &arg2);

  pthread_join(t1, NULL);
  pthread_join(t2, NULL);

  delete dl;

  cout << "thread " << a->id << " finished at " << atTime() << endl;

  return NULL;
}

void* BucketReUseDriver::reuseThread(void* arg)
{
  ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
  BucketDataList* dl = a->dl;
  CPPUNIT_ASSERT(dl == NULL);

  if (a->cond != NULL)
  {
    pthread_mutex_lock(a->mutex);

    while (*(a->flag) != true)
      pthread_cond_wait(a->cond, a->mutex);

    pthread_mutex_unlock(a->mutex);
  }

  cout << "thread " << a->id << " start at " << atTime() << endl;

  string dummy;
  bool scan = true;
  boost::shared_ptr<execplan::CalpontSystemCatalog> c =
      execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
  execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
  BucketReuseControlEntry* entry =
      BucketReuseManager::instance()->userRegister(tcn, dummy, a->version, a->buckets, scan);
  CPPUNIT_ASSERT(scan == false);

  ResourceManager rm;
  dl = new BucketDataList(a->buckets, 1, a->elements, rm);
  dl->setElementMode(1);
  dl->reuseControl(entry, !scan);

  ThreadArg arg1 = *a;
  arg1.id *= 10;
  arg1.dl = dl;

  pthread_t t1;
  pthread_create(&t1, NULL, readThread, &arg1);

  if (entry->fileStatus() == BucketReuseControlEntry::progress_c)
  {
    boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
    dl->reuseControl()->stateChange().wait(lock);
  }
  else
  {
    CPPUNIT_ASSERT((entry->fileStatus() == BucketReuseControlEntry::using_c) ||
                   (entry->fileStatus() == BucketReuseControlEntry::ready_c));
  }

  // the bucket files are ready
  dl->restoreBucketInformation();
  dl->endOfInput();

  pthread_join(t1, NULL);

  delete dl;

  cout << "thread " << a->id << " finished at " << atTime() << endl;

  return NULL;
}

void* BucketReUseDriver::raceThread(void* arg)
{
  ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
  BucketDataList* dl = a->dl;
  CPPUNIT_ASSERT(dl == NULL);

  if (a->cond != NULL)
  {
    pthread_mutex_lock(a->mutex);

    while (*(a->flag) != true)
      pthread_cond_wait(a->cond, a->mutex);

    pthread_mutex_unlock(a->mutex);
  }

  cout << "thread " << a->id << " start at " << atTime() << endl;

  string dummy;
  bool scan = true;
  ResourceManager rm;
  BucketReuseControlEntry* entry = NULL;
  {
    boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
    boost::shared_ptr<execplan::CalpontSystemCatalog> c =
        execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
    execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
    entry = BucketReuseManager::instance()->userRegister(tcn, dummy, a->version, a->buckets, scan);

    dl = new BucketDataList(a->buckets, 1, a->elements, rm);
    dl->setElementMode(1);
    dl->reuseControl(entry, !scan);
  }

  if (scan == true)
  {
    CPPUNIT_ASSERT(entry->fileStatus() == BucketReuseControlEntry::progress_c);

    ThreadArg arg1 = *a;
    arg1.id *= 10;
    arg1.dl = dl;

    pthread_t t1;
    pthread_create(&t1, NULL, insertThread, &arg1);

    ThreadArg arg2 = arg1;
    arg2.id += 1;
    pthread_t t2;
    pthread_create(&t2, NULL, readThread, &arg2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
  }
  else
  {
    ThreadArg arg1 = *a;
    arg1.id *= 10;
    arg1.dl = dl;

    pthread_t t1;
    pthread_create(&t1, NULL, readThread, &arg1);

    if (entry->fileStatus() == BucketReuseControlEntry::progress_c)
    {
      boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
      dl->reuseControl()->stateChange().wait(lock);
    }
    else
    {
      CPPUNIT_ASSERT((entry->fileStatus() == BucketReuseControlEntry::using_c) ||
                     (entry->fileStatus() == BucketReuseControlEntry::ready_c));
    }

    // the bucket files are ready
    dl->restoreBucketInformation();
    dl->endOfInput();

    pthread_join(t1, NULL);
  }

  delete dl;

  cout << "thread " << a->id << " finished at " << atTime() << endl;

  return NULL;
}

timespec atTime()
{
  timespec ts, ts1, ts2;
  ts1 = BucketReUseDriver::ts;
  clock_gettime(CLOCK_REALTIME, &ts2);

  if (ts2.tv_nsec < ts1.tv_nsec)
  {
    ts.tv_sec = ts2.tv_sec - ts1.tv_sec - 1;
    ts.tv_nsec = ts2.tv_nsec + 1000000000 - ts1.tv_nsec;
  }
  else
  {
    ts.tv_sec = ts2.tv_sec - ts1.tv_sec;
    ts.tv_nsec = ts2.tv_nsec - ts1.tv_nsec;
  }

  return ts;
}

ostream& operator<<(ostream& os, const struct timespec& t)
{
  os << t.tv_sec << "." << setw(9) << setfill('0') << t.tv_nsec << "s";
  return os;
}
